package com.lianda.sql;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;

/**
 * 批处理SQL，Batch SQL API
 */
public class BatchWordCountSQLMain {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment  tEnv = BatchTableEnvironment.create(env);

        DataSet<WC> input = env.fromElements(
                new WC("Hello", 1),
                new WC("World", 1),
                new WC("Hello", 2)
        );

        //注册为表
        tEnv.registerDataSet("WordCount", input, "word, totalNum");
        Table table = tEnv.sqlQuery("SELECT word, SUM(totalNum) as totalNum FROM WordCount GROUP BY word");
        DataSet<WC> result = tEnv.toDataSet(table, WC.class);

        result.print();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class WC {

        /**
         * word
         */
        public String word;

        /**
         * 出现的次数
         */
        public long totalNum;
    }
}
